package com.bw.util;


import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.*;


public class MysqlUtil extends RichSinkFunction<String> {
    Connection conn;
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        conn = DriverManager.getConnection("jdbc:mysql://hadoop101:3306/Flink?CharacterEncoding=utf-8", "root", "123456");
    }

    @Override
    public void close() throws Exception {
        conn.close();
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        String[] str = value.split(",");
        PreparedStatement prep = conn.prepareStatement("insert into data values(0,?,?,?,?,?,?)");
        prep.setObject(1,str[0]);
        prep.setObject(2,str[1]);
        prep.setObject(3,str[2]);
        prep.setObject(4,str[3]);
        prep.setObject(5,str[4]);
        prep.setObject(6,str[5]);


        prep.executeUpdate();
    }
}
